Setup
before running pyspark run
export SPARK_DRIVER_MEMORY=20g
In this segment we load the dataset and keep only a tiny subset
In [1]:
data = sc.textFile('datasets/wdc/split-pld-arc/')
In [2]:
size = 2e7
small = data.map(
lambda line: [int(x) for x in line.split('\t')]
).filter(
lambda (source, destination): source<size and destination<size
)
In [3]:
small.cache()
Out[3]:
Reading all of the PLD graph takes 300 seconds. Disk read time is not the issue here. This time is necessary even when the whole 11GB of the dataset are cached in memory.
For $size = 2e7$ this step takes 253sec (part=367, vecPart=64)
In [ ]:
small.count()
In [ ]:
small.take(10)
Retrieving one edge uniformly at random takes 34 seconds.
Alternative implementation:
Partition A by row, and then just sum the rows for in-degree
use partitionBy()
In [ ]:
vectorPart = 32*2
In [ ]:
x1 = small.map(
lambda (source, dest): (dest,1)
).reduceByKey(
add,
numPartitions = vectorPart
)
In [ ]:
x1 = normalizevalue(x1)
x1.cache()
The first iteration takes 282 seconds.
For $size = 2e7$ this step takes 57sec (vecPart=64)
In [ ]:
#x1.saveAsTextFile('datasets/wdc/results/full-Iteration1')
In [ ]:
#approxjoin = x1.filter(lambda (k,v): v>2).join(small)
#x2 = approxjoin.map(lambda (k,v): (v[1], v[0])).reduceByKey(add)
#x2.take(5)
For $size = 1e7$ this step takes 48sec (part=367, vecPart=64)
For $size = 2e7$ this step takes 148sec (part=367, vecPart=64)
In [ ]:
def exactiteration(A, x, part=vectorPart):
return x.join(
A
).map(
lambda (src,(juice,dest)): (dest, juice)
).reduceByKey(
add,
numPartitions = vectorPart
)
In [ ]:
def normalizevalues(x):
sm = float(x.values().sum())
return x.mapValues(
lambda v: v/sm
)
In [ ]:
x2 = exactiteration(small, x1, part=vectorPart)
In [ ]:
x2 = normalizevalues(x2)
In [ ]:
x2.cache()
In [ ]:
x2.take(10)
In [ ]:
x3 = exactiteration(small, x2, part=vectorPart)
In [ ]:
x3 = normalizevalues(x3)
In [ ]:
x3.cache()
In [ ]:
x4 = exactiteration(small, x3, part=vectorPart)
In [ ]:
x4 = normalizevalue(x4)
x4.cache()
In [ ]:
x4.take(10)
In [ ]:
x5 = exactiteration(small, x4, part=vectorPart)
x5 = normalizevalue(x5)
x5.cache()
In [ ]:
x6 = exactiteration(small, x5, part=vectorPart)
x6 = normalizevalue(x6)
x6.cache()
In [ ]:
names = sc.textFile('datasets/wdc/pld-index')
In [ ]:
def indexline(line):
parts = line.split('\t')
return (int(parts[1]), parts[0])
In [ ]:
index = names.map(indexline).filter(lambda (k,v): k<size)
In [ ]:
index.cache()
In [ ]:
topnodes = index.join(x4.filter(lambda (k,v): v>0.0001))
In [ ]:
topnodes.cache()
In [ ]:
topnodes.take(10)
In [ ]:
topnodes.sortBy(lambda (k,v): v[1], ascending=False).take(10)
In [ ]:
topnodes
In [ ]: